{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip3 install --upgrade --user kfp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl as dsl"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use Kubeflow's built in Spark operator\n",
    "#tag::launch_operator[]\n",
    "resource = {\n",
    "    \"apiVersion\": \"sparkoperator.k8s.io/v1beta2\",\n",
    "    \"kind\": \"SparkApplication\",\n",
    "    \"metadata\": {\n",
    "        \"name\": \"boop\",\n",
    "        \"namespace\": \"kubeflow\"\n",
    "    },\n",
    "  \"spec\": {\n",
    "      \"type\": \"Python\",\n",
    "      \"mode\": \"cluster\",\n",
    "      \"image\": \"gcr.io/boos-demo-projects-are-rad/kf-steps/kubeflow/myspark\",\n",
    "      \"imagePullPolicy\": \"Always\",\n",
    "      \"mainApplicationFile\": \"local:///job/job.py\", # See the Dockerfile OR use GCS/S3/...\n",
    "      \"sparkVersion\": \"2.4.5\",\n",
    "      \"restartPolicy\": {\n",
    "        \"type\": \"Never\"\n",
    "      },\n",
    "  \"driver\": {\n",
    "    \"cores\": 1,  \n",
    "    \"coreLimit\": \"1200m\",  \n",
    "    \"memory\": \"512m\",  \n",
    "    \"labels\": {\n",
    "      \"version\": \"2.4.5\",  \n",
    "    },      \n",
    "    \"serviceAccount\": \"spark-operatoroperator-sa\", # also try spark-operatoroperator-sa\n",
    " },\n",
    "  \"executor\": {\n",
    "    \"cores\": 1,\n",
    "    \"instances\": 2,\n",
    "    \"memory\": \"512m\"  \n",
    "  },    \n",
    "  \"labels\": {\n",
    "    \"version\": \"2.4.5\"\n",
    "  },      \n",
    "  }\n",
    "}\n",
    "\n",
    "@dsl.pipeline(\n",
    "    name=\"local Pipeline\",\n",
    "    description=\"No need to ask why.\"\n",
    ")\n",
    "def local_pipeline():\n",
    "\n",
    "    rop = dsl.ResourceOp(\n",
    "        name=\"boop\",\n",
    "        k8s_resource=resource,\n",
    "        action=\"create\",\n",
    "        success_condition=\"status.applicationState.state == COMPLETED\"\n",
    "    )\n",
    "#end::launch_operator[]\n",
    "\n",
    "import kfp.compiler as compiler\n",
    "\n",
    "compiler.Compiler().compile(local_pipeline,\"boop.zip\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = kfp.Client()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "my_experiment = client.create_experiment(name='boop-test-2')\n",
    "my_run = client.run_pipeline(my_experiment.id, 'boop-test', \n",
    "  'boop.zip')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
